在上一篇我們實作了基礎的 Speed Layer 架構,包含 Source、Sink、SimpleStreamingEngine 三個核心組件。一開始系統運作良好,Consumer 順利處理訂單數據,一切看起來很完美。
但當系統面臨真正的挑戰時 - 比如雙 11 大促銷 - 問題就浮現了:
這時候我們發現,效能瓶頸是 Stream Processing 系統必須面對的核心挑戰。
透過實際經驗,我們歸納出 Consumer 最常見的效能瓶頸。當然,還有其他較少見的問題(如 GC 調優、記憶體洩漏、網路配置等),但以下四種是最主要且最容易遇到的:
問題描述:每筆訊息都執行一次 INSERT,然後立即 commit,對資料庫造成大量小型交易負擔。
影響:資料庫頻繁處理小型交易,無法發揮批量處理的效率優勢。
解決方案:實作批量寫入機制,一次收集數百筆記錄再執行批量 INSERT,大幅減少 transaction commit 次數。
問題描述:每筆訊息都需要進行 JSON 反序列化處理。
影響:CPU 資源被大量 JSON 解析操作佔用,降低整體處理效率。
解決方案:
問題描述:Topic 的 Partition 數量過少,限制了 Consumer 的平行處理能力。
影響:即使增加 Consumer 實例,也無法提升整體吞吐量。
解決方案:適當增加 Partition 數量,讓系統能夠水平擴展。
問題描述:Broker、Consumer、資料庫之間的網路距離造成額外延遲。
影響:每個網路往返都增加處理時間。
解決方案:盡量將相關組件部署在相近的網路位置。
經驗分享:在實務中,問題 2 和 3 相對容易解決,問題 4 在大多數企業基礎設施中也不會是主要瓶頸。真正棘手的往往是問題 1(資料庫寫入速度),這也是我們接下來要深入探討的重點。
在解決資料庫寫入瓶頸之前,我們可以先從另一個角度思考:減少需要處理的資料量。
核心概念:在資料進入資料庫之前,先過濾掉不需要的資料,只處理真正有價值的訊息。
舉例來說,如果訂單狀態為 'removed' 的資料對業務沒有意義,我們可以在 Consumer 階段就將這些資料過濾掉,避免佔用後續的處理資源。
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
為了實現靈活的資料過濾功能,我們引入 DataFrame 概念。這個抽象層位於 Source 和 Sink 之間,負責資料的轉換和過濾操作。
Source → DataFrame → Sink
(過濾、轉換)
DataFrame 的核心功能:
Step 1:SimpleDataFrame 初始化
class SimpleDataFrame:
def __init__(self, name: str = "dataframe"):
self.name = name
self._filters = [] # 過濾函數列表
self._sinks = [] # Sink 列表
核心概念:
Step 2:filter 方法核心邏輯
def filter(self, func) -> 'SimpleDataFrame':
# 創建新的 DataFrame 實例以支援鏈式調用
new_df = SimpleDataFrame(f"{self.name}_filtered")
new_df._filters = self._filters.copy() # 複製現有過濾器
new_df._filters.append(func) # 添加新過濾器
new_df._sinks = self._sinks.copy() # 複製現有 Sink
return new_df
設計重點:
df.filter(...).filter(...)
Step 3:sink 方法
def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
self._sinks.append(sink)
return self # 返回自身支援鏈式調用
核心功能:將處理後的資料發送到指定的 Sink。
Step 4:process_message 核心處理邏輯
def process_message(self, message) -> bool:
# 依序檢查所有過濾器
for filter_func in self._filters:
if not filter_func(message):
return False # 被過濾掉
# 通過所有過濾器,發送到所有 Sink
for sink in self._sinks:
sink.write(message)
return True
處理流程:
# 簡單的 DataFrame 類別,支援 filter 算子
import logging
from typing import Any, Dict, Callable, List
from .sink import BaseSink
logger = logging.getLogger(__name__)
class SimpleDataFrame:
"""
簡單的 DataFrame 類別,支援基本的流處理操作
"""
def __init__(self, name: str = "dataframe"):
self.name = name
self._filters: List[Callable[[Dict[str, Any]], bool]] = [] # 過濾函數列表
self._sinks: List[BaseSink] = [] # Sink 列表
def filter(self, func: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
"""
添加過濾條件
:param func: 過濾函數,接收 message 字典,返回 True 表示通過過濾
:return: 返回新的 SimpleDataFrame 實例以支援鏈式調用
"""
# 創建新的 DataFrame 實例以支援鏈式調用
new_df = SimpleDataFrame(f"{self.name}_filtered")
new_df._filters = self._filters.copy() # 複製現有的過濾器
new_df._filters.append(func) # 添加新的過濾器
new_df._sinks = self._sinks.copy() # 複製現有的 Sink
logger.debug(f"Added filter to {self.name}")
return new_df
def __getitem__(self, condition: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
"""
使用 [] 語法添加過濾條件,等同於 filter() 方法
:param condition: 過濾函數,接收 message 字典,返回 True 表示通過過濾
:return: 返回新的 SimpleDataFrame 實例以支援鏈式調用
"""
return self.filter(condition)
def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
"""
添加 Sink
:param sink: Sink 實例
:return: 返回自身以支援鏈式調用
"""
self._sinks.append(sink)
logger.debug(f"Added sink {sink.name} to {self.name}")
return self
def process_message(self, message: Dict[str, Any]) -> bool:
"""
處理單個訊息,通過所有過濾器後發送到 Sink
:param message: 訊息字典
:return: True 表示訊息通過了所有過濾器
"""
# 依序檢查所有過濾器
for filter_func in self._filters:
try:
if not filter_func(message):
logger.debug("Message filtered out")
return False
except Exception as e:
logger.error(f"Error in filter: {e}")
return False
# 如果通過所有過濾器,發送到所有 Sink
for sink in self._sinks:
try:
sink.write(message)
except Exception as e:
logger.error(f"Error writing to sink {sink.name}: {e}")
return True
我們需要調整 SimpleStreamingEngine 的邏輯,讓它能夠支援 DataFrame 的概念。新的架構中,Source 會先將訊息交給對應的 DataFrame 處理,通過過濾與轉換後,再統一輸出到 Sink。
原本的架構:
┌─────────────────────┐
│SimpleStreamingEngine│ ◄── Central Manager
│ │
│ +add_source() │
│ +add_sink() │
│ + run() │
└─────────────────────┘
│
│ manages
▼
┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Sink │
│ │ │ │
│ KafkaSource │ │PostgreSQLSink│
└──────────────┘ └──────────────┘
升級後的架構:
┌─────────────────────┐
│SimpleStreamingEngine│ ◄── Central Manager
│ │
│ +add_source() │
│ +dataframe() │
│ + run() │
└─────────────────────┘
│
│ manages
▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Source │───▶│ DataFrame │───▶│ Sink │
│ │ │ │ │ │
│ KafkaSource │ │ +filter() │ │PostgreSQLSink│
└──────────────┘ └──────────────┘ └──────────────┘
Step 1:新增 DataFrame 支援
升級後的 SimpleStreamingEngine 需要管理 DataFrame,而不是直接管理 Sink:
class SimpleStreamingEngine:
def __init__(self, name: str = "simple-streaming-engine"):
self.name = name
self._sources = []
self._dataframes = [] # 新增:管理 DataFrame 列表
self._source_dataframe_map = {} # 新增:Source 到 DataFrame 的映射
核心變化:
Step 2:dataframe 方法 - 建立 Source 和 DataFrame 的關聯
def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
df = SimpleDataFrame(f"df-{source.name}")
self._dataframes.append(df)
self._source_dataframe_map[source] = df # 建立映射
self.add_source(source)
return df
關鍵功能:
Step 3:升級 run 方法
def run(self):
# 設定所有 DataFrame 中的 Sink
all_sinks = []
for df in self._dataframes:
all_sinks.extend(df._sinks)
for sink in all_sinks:
sink.setup()
# 為每個 Source 設定訊息處理器
for source in self._sources:
source.message_handler = self._create_message_handler(source)
source.run()
處理邏輯調整:
Step 4:新的訊息處理器
def _create_message_handler(self, source: BaseSource):
def message_handler(message):
if source in self._source_dataframe_map:
df = self._source_dataframe_map[source]
df.process_message(message) # 交給 DataFrame 處理
return message_handler
核心改變:
import logging
from typing import List
from .source import BaseSource
from .dataframe import SimpleDataFrame
logger = logging.getLogger(__name__)
class SimpleStreamingEngine:
"""
支援 DataFrame 的簡單流處理引擎
"""
def __init__(self, name: str = "simple-streaming-engine"):
self.name = name
self._sources: List[BaseSource] = []
self._dataframes: List[SimpleDataFrame] = []
self._source_dataframe_map = {}
def add_source(self, source: BaseSource):
"""
添加 Source 到流處理引擎
"""
self._sources.append(source)
def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
"""
創建 DataFrame 並關聯到 Source
"""
df = SimpleDataFrame(f"df-{source.name}")
self._dataframes.append(df)
self._source_dataframe_map[source] = df
self.add_source(source)
return df
def run(self):
"""
啟動流處理引擎,開始處理數據流
"""
# 設定所有 DataFrame 中的 Sink
all_sinks = []
for df in self._dataframes:
all_sinks.extend(df._sinks)
for sink in all_sinks:
sink.setup()
# 為每個 Source 設定訊息處理器
for source in self._sources:
source.message_handler = self._create_message_handler(source)
source.run()
def _create_message_handler(self, source: BaseSource):
"""
創建訊息處理器,將訊息路由到對應的 DataFrame
"""
def message_handler(message):
if source in self._source_dataframe_map:
df = self._source_dataframe_map[source]
df.process_message(message)
return message_handler
現在讓我們看看如何實際使用升級後的架構來過濾無效資料。
Step 1:創建 SimpleStreamingEngine
engine = SimpleStreamingEngine(name="orders-processing-engine")
Step 2:創建 Source 並建立 DataFrame
orders_source = SimpleKafkaSource(name="orders-source", topic="orders")
sdf = engine.dataframe(source=orders_source)
Step 3:添加過濾條件
# 過濾掉 removed 訂單
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')
# 或者使用 [] 語法(兩種寫法效果相同)
sdf = sdf[lambda msg: msg.get('value', {}).get('status') != 'removed']
Step 4:添加 Sink 並啟動
pg_sink = SimplePostgreSQLSink(...)
sdf.sink(pg_sink)
engine.run() # 開始處理:Kafka → 過濾 → PostgreSQL
# 1. 創建 SimpleStreamingEngine
engine = SimpleStreamingEngine(name="orders-processing-engine")
# 2. 創建 Kafka Source
orders_source = SimpleKafkaSource(name="orders-source", topic="orders")
# 3. 創建 DataFrame 並添加過濾器
sdf = engine.dataframe(source=orders_source)
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')
# 4. 添加 PostgreSQL Sink
pg_sink = SimplePostgreSQLSink(
name="orders-sink",
table_name="valid_orders",
# ... 其他參數
)
# 5. 組裝並啟動
sdf.sink(pg_sink)
engine.run() # 開始處理:Kafka → 過濾 → PostgreSQL
資料流向:
KafkaSource → DataFrame.filter() → PostgreSQLSink
這樣的設計讓過濾邏輯變得清晰且易於擴展,我們可以輕鬆添加更多過濾條件或修改處理邏輯。
今天我們探討了 Speed Layer 中常見的效能瓶頸問題,並提出了「先過濾再入庫」的優化思路。
主要收穫:
通過 Source-DataFrame-Sink 的架構設計,我們建立了:
雖然過濾可以減少資料量,但當業務持續成長時,我們仍然需要面對大量有效資料的處理挑戰。下一篇我們將深入探討批量寫入機制,學習如何通過批量處理技術大幅提升資料庫寫入效能,讓你的 Speed Layer 在高峰期依然能穩定運行。